package com.atuguigu.flink.Day03;

import com.atuguigu.flink.Day01.Singlesensor.SensorSource;
import com.atuguigu.flink.sensor.SendsorReading;
import org.apache.commons.compress.archivers.dump.DumpArchiveEntry;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class Example1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env
                .addSource(new SensorSource())
                .keyBy(r->r.id)
                .process(new TimeIncreaseAlert())
                .print();


        env.execute();


    }
    public static class TimeIncreaseAlert extends KeyedProcessFunction<String,SendsorReading,String> {
        //保存最近的一次温度值
        private ValueState<Double> lastTemp;
        //保存报警定时器的时间戳
        private ValueState<Long> timerTs;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            ValueState<Double> lastTemp = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last_temp", Types.DOUBLE));
            ValueState<Long> timerTs = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timerTs", Types.LONG));
        }

        @Override
        public void processElement(SendsorReading sendsorReading, Context context, Collector<String> collector) throws Exception {
            //温度初始值
            double prevTemp=0.0;
            //如果来的是第一条温度
            if(lastTemp.value()==null){
                lastTemp.update(sendsorReading.temperture);
            }else {
                prevTemp=lastTemp.value();//从状态变量中，将最近一次的取出
                lastTemp.update(sendsorReading.temperture);
            }

            Long ts=0L;
            if(timerTs.value()!=null){
               ts=timerTs.value(); //获取报警器的时间戳
            }
            if(prevTemp==0.0 || sendsorReading.temperture < prevTemp){
                // 如果来的是第一条温度值或者温度出现了下降
                // 删除报警定时器
                context.timerService().deleteProcessingTimeTimer(ts);
                timerTs.clear();// 清空保存报警定时器时间戳的状态变量，timerTs.value()又变为null
            }else if(sendsorReading.temperture > prevTemp && ts == 0L){
                long oneSecLater=context.timerService().currentProcessingTime()+ 1000L;
                context.timerService().registerProcessingTimeTimer(oneSecLater);
                timerTs.update(oneSecLater);
            }


        }


        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
            out.collect("传感器"+ctx.getCurrentKey()+"温度连续上声警告");
            timerTs.clear();
        }
    }
}
